一、概述
Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。)通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。
最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费
1.1 相关概念
绑定器
通过定义绑定器作为中间层,实现了应用程序与消息中间件(Middleware)细节之间的隔离。通过向应用程序暴露统一的Channel通过,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。甚至可以任意的改变中间件的类型而不需要修改一行代码。
Spring Cloud Stream支持各种binder实现 Apache KafkaAmazon KinesisGoogle PubSubSolace PubSub+Azure Event Hubs
发布/订阅模型
在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的 Topic 主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中, Topic 可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic
二、使用示例
生产者
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>定义bingding 发送消息时需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:
声明了一个 binding 命名为 “output”。这个binding 声明了一个消息输出流,也就是消息的生产者。
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
}配置
spring:
cloud:
stream:
bindings:
output:
destination: itcast-default
contentType: text/plaincontentType:用于指定消息的类型。具体可以参考 spring cloud stream docs destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 itcastdefault 的所有消息队列中。
发送消息
@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {
@Autowired
@Qualifier("output")
MessageChannel output;
@Override
public void run(String... strings) throws Exception {
//发送MQ消息
output.send(MessageBuilder.withPayload("hello world").build());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}消费者
依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>定义bingding 在Spring Cloud Stream中接受消息,需要定义一个接口
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
}注释 @Input 对应的方法,需要返回 SubscribableChannel ,并且参入一个参数值。
配置
spring:
cloud:
stream:
bindings:
input:
destination: itcast-defaultdestination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 itcast-default
接收消息
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
// 监听 binding 为 Sink.INPUT 的消息
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("监听收到:" + message.getPayload());
}
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}定义一个 class (这里直接在启动类),并且添加注解@EnableBinding(Sink.class) ,其中Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为@StreamListener(Processor.INPUT),方法参数为 Message 。
启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “itcast-default ”,routing key为 “#”。 所有发送 exchange 为“itcast-default ” 的MQ消息都会被投递到这个临时队列,并且触发上述的方法。
自定义消息通道
Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流
interface OrderProcessor {
String INPUT_ORDER = "inputOrder";
String OUTPUT_ORDER = "outputOrder";
@Input(INPUT_ORDER)
SubscribableChannel inputOrder();
@Output(OUTPUT_ORDER)
MessageChannel outputOrder();
}一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入,和订单输出两个 binding。
使用时,需要在 @EnableBinding 注解中,添加自定义的接口。 使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT_ORDER
配置
spring:
cloud:
stream:
defaultBinder: defaultRabbit
bindings:
inputOrder:
destination: mqTestOrder
outputOrder:
destination: mqTestOrder指定了 destination 为 mqTestOrder 的输入输出流。
消息分组与分区
希望生产者产生的消息只被其中一个实例消费可以使用消息分组功能 在同一个group中的多个消费者只有一个可以获取到消息并消费
server:
port: 7003 #服务端口
spring:
application:
name: rabbitmq-consumer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: itcast
password: itcast
virtual-host: myhost
cloud:
stream:
bindings:
input:
destination: itcast-default
inputOrder:
destination: testChannel
group: group-2
binders:
defaultRabbit:
type: rabbit同一个特征的数据被同一个实例消费可以使用消息分区功能
消费者配置
- spring.cloud.stream.bindings.input.consumer.partitioned :通过该参数开启消费者分区功能;
- spring.cloud.stream.instanceCount :该参数指定了当前消费者的总实例数量;
- spring.cloud.stream.instanceIndex :该参数设置当前实例的索引号,从0开始,最大值为 spring.cloud.stream.instanceCount 参数 - 1。我们试验的时候需要启动多个实例,可以通过运行参数来为不同实例设置不同的索引值。
cloud:
stream:
instance-count: 2
instance-index: 0
bindings:
input:
destination: itcast-default
inputOrder:
destination: testChannel
group: group-2
consumer:
partitioned: true
binders:
defaultRabbit:
type: rabbit生产者配置
- spring.cloud.stream.bindings.output.producer.partitionKeyExpression :通过该参数指定了分区键的表达式规则,我们可以根据实际的输出消息规则来配置SpEL来生成合适的分区键;
- spring.cloud.stream.bindings.output.producer.partitionCount :该参数指定了消息分区的数量。
spring:
application:
name: rabbitmq-producer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: itcast
password: itcast
virtual-host: myhost
cloud:
stream:
bindings:
input:
destination: itcast-default
spring:
application:
name: rabbitmq-producer #指定服务名
rabbitmq:
addresses: 127.0.0.1
username: itcast
password: itcast
virtual-host: myhost
cloud:
stream:
bindings:
input:
destination: itcast-default